package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by 黄凯 on 2023/6/26 0026 16:49
 *
 * @author 黄凯
 * 永远相信美好的事情总会发生.
 *
 * JdbcConnector: 用于数据库数据的读写。
 *  *    1. 如果建表定义了主键， 写入时采用upsert模式
 *  *    2. 如果建表没有定义主键，写入采用append模式
 */
public class Flink05_JdbcConnector {

    public static void main(String[] args) {

        //1. 创建表环境
        TableEnvironment tableEnv =
                TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());

        //2. Jdbc Connector read
        String readSql =
                " create table t1 (" +
                        " id STRING , " +
                        " vc INT , " +
                        " ts BIGINT " +
                        ") WITH (" +
                        " 'connector' = 'jdbc', " +
                        " 'url' = 'jdbc:mysql://hadoop102:3306/test', " +
                        " 'table-name' = 's1', " +
                        " 'username' = 'root', " +
                        " 'password' = '000000' " +
                        ")" ;

        tableEnv.executeSql(readSql) ;

        Table table = tableEnv.sqlQuery("select * from t1");
        //table.execute().print();

        //3. Jdbc Connector read
        String writeSql =
                " create table t2 (" +
                        " id STRING , " +
                        " vc INT , " +
                        " ts BIGINT ," +
                        " PRIMARY KEY (id) NOT ENFORCED " +
                        ") WITH (" +
                        " 'connector' = 'jdbc', " +
                        " 'url' = 'jdbc:mysql://hadoop102:3306/test', " +
                        " 'table-name' = 's2', " +
                        " 'username' = 'root', " +
                        " 'password' = '000000' " +
                        ")" ;

        tableEnv.executeSql(writeSql);

        //从t1表查， 写入到t2
        tableEnv.executeSql("insert into t2 select * from t1") ;

    }

}
